-
Notifications
You must be signed in to change notification settings - Fork 3k
dynamically load custom FileIO implementation #1618
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| HadoopFileIO.class.getName(), | ||
| HadoopFileIO.class, | ||
| new Class<?>[] { Configuration.class }, | ||
| new Object[]{ conf }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is actually cleaner than just using the reflection utilities here. I understand wanting to reuse code, but I don't think that this operation is a good candidate for it.
This mixes property checking with the reflect operation, and as a result the default implementation is now loaded using reflection as well. That means that we lose the compile time check that we're instantiating HadoopFileIO correctly and lose the ability to easily refactor.
This also doesn't support more than one constructor. For LocationProvider, the constructor could accept either location and properties or no args, but this only checks for a single constructor and is harder to read as well.
The first thing I would change is to move the property check out of the utility method and directly instantiate HadoopFileIO in an else block. After making that change, there's little value in wrapping the reflection helpers. The main complexity is catching NoSuchMethodException to return a better error message (which is optional), but making that generic ends up being quite complex:
List<List<String>> allowedArgTypesString = allowedArgTypes.stream()
.map(argTypes -> Arrays.stream(argTypes).map(Class::getName).collect(Collectors.toList()))
.collect(Collectors.toList());
throw new IllegalArgumentException(String.format(
"Unable to find a constructor for implementation %s of %s. " +
"Make sure the implementation is in classpath, and that it either " +
"has a public no-arg constructor or one of the following constructors: %s ",
impl, classInterface, allowedArgTypesString), e);There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method supports no arg constructor, and also allows using multiple constructor options. The main reason for refactoring is because it has 2 lengthy try catch blocks for NoSuchMethodException when looking for a constructor, and also a ClassCastException when trying to cast the class.
But I agree that it loses the ability to check default implementation at compile time. What if I leave the default class construction in the else blocks, and keep the util to only initialize the dynamic class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you left the property handling and default classes to else blocks, then the only value that is left is having one method with blocks to throw a little more friendly exception messages. But the cost of that is that both the call and the implementation are quite a bit more complicated and harder to read. I don't really think it is worth the refactor.
| current() == null ? new HashMap<>() : current().properties(), | ||
| TableProperties.WRITE_FILE_IO_IMPL, | ||
| HadoopFileIO.class.getName(), | ||
| HadoopFileIO.class, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The interface should be FileIO instead of HadoopFileIO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's HadoopFileIO in the original class. Is it intended or a bug?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the same concrete class was always instantiated, I think we used the class in case we needed anything specific to HadoopFileIO. But it can be FileIO since nothing specific is used.
| if (defaultFileIo == null) { | ||
| defaultFileIo = new HadoopFileIO(conf); | ||
| defaultFileIo = ClassLoaderUtil.fromProperty( | ||
| current() == null ? new HashMap<>() : current().properties(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should move instantiation to a separate method that accepts TableMetadata so that we can use the correct metadata in transactions.
| } | ||
|
|
||
| static class DefaultLocationProvider implements LocationProvider { | ||
| public static class DefaultLocationProvider implements LocationProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to make this public, and I'd like to avoid exposing new classes through the public API. If we need to use reflection, we can always use hiddenImpl to set the accessible flag. But I think it would be better to instantiate these classes directly like before so that we have compile checks.
hive-metastore/src/main/java/org/apache/iceberg/hive/HiveTableOperations.java
Show resolved
Hide resolved
| */ | ||
| public static FileIO loadFileIO(TableMetadata tableMetadata, Configuration conf) { | ||
| if (tableMetadata != null) { | ||
| Map<String, String> properties = tableMetadata.properties(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like we only need properties here. I think that we should probably pass a map of properties to this method instead. That way changes to table metadata won't need to affect this class.
| DynConstructors.Ctor<FileIO> ctor; | ||
| try { | ||
| ctor = DynConstructors.builder(FileIO.class) | ||
| .impl(impl, Configuration.class) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would make sense to add an alternative constructor that accepts Map to pass the table properties. We can do this in a follow-up when it's needed, or here if you agree.
| } catch (ClassCastException e) { | ||
| throw new IllegalArgumentException( | ||
| String.format("Cannot initialize FileIO, fail to cast %s to class %s.", | ||
| impl, FileIO.class), e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Style: does this need to be on a separate line?
|
|
||
| public static final String WRITE_LOCATION_PROVIDER_IMPL = "write.location-provider.impl"; | ||
|
|
||
| public static final String WRITE_FILE_IO_IMPL = "write.file-io.impl"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't just a write option. It controls how files are read as well.
How about using an io namespace instead? Maybe io.file-io.impl or io.impl?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I used write because there is no namespace for both read and write. I can change to io.file-io.impl, io.impl seems too generic in case we want to have something else in the same namespace in the future.
| if (fileIO == null) { | ||
| fileIO = new HadoopFileIO(conf); | ||
| // avoid refresh metadata because refresh() calls io() again | ||
| fileIO = CatalogUtil.loadFileIO(getCurrentMetadataNoRefresh(), conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think about moving this to the base class? Then we wouldn't need to add the "no refresh" method to get metadata and could access current directly. This PR also makes it far less likely that implementations will override io, since a new one can be plugged in easily through table properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about doing that, but (1) if it is put to base class, the logic still needs to be duplicated, because HadoopTableOperations does not extend that base class; (2) the method needs to take the Hadoop configuration object, and it seems like the base class tries to not assume an implementation must use Hadoop configuration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point about Configuration. We don't want to rely on it in the base class. Let me take a closer look at this problem, it's concerning that there's a loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I am also just thinking about the loop issue. If someone tries to create a new table, at that point of time currentMetadata is null, so it will choose use the default FileIO to write the initial metadata, and the new FileIO might not be able to read it, which would be a problem.
That means when creating the table it needs to check if the io.file-io.impl is set in the properties and update the FileIO implementation accordingly in the doCommit method. But it feels like an ugly hack...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, and we have to instantiate FileIO before we load the metadata for a table, so we don't really know if we're going to get it right. Seems like we are trying to configure this in the wrong place. Let's talk about where this should go in the sync today.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because Iceberg catalog does not enforce the implementation detail of how table properties are store, an implementation can potentially write the table properties using the io(). Therefore if we try to determine the io() to use based on table property, there is always a chance for cyclic dependency in logic. HadoopCatalog has exactly this problem.
Here are 3 potential ways to solve this problem:
-
Create a new constructor for each
TableOperationthat acceptsFileIOas an argument. If the constructor is used, thenFileIOis set at construction time instead of the first timeio()is called. A default implementation can be loaded based on namespace properties. Engines like Spark and Flink can load a configuration key from Hadoop config to load the FileIO outside the core logic. This requires code change at multiple places, including (1) add new constructors in existing table operations, and (2) add support in each engine separately. -
Determine
FileIObased on the warehouse path scheme. For example,s3://always useS3FileIO,hdfs://always useHadoopFileIO. However, it is easy to create a cyclic dependency issue, for example:iceberg-awsmodule depends oniceberg-core, soHadoopCataloginiceberg-corecannot importS3FileIOiniceberg-aws. This means we need a registry that allows people to register custom IO mapping at runtime. This approach has a similar amount of work as approach 1, because we need code change in each existing catalog, and each engine needs to find a way to registerFIleIOimplementation to scheme mapping at JVM start time. -
Load
FileIOusing Hadoop configuration. BecauseHadoopFileIOis always the default implementation, Hadoop configuration object is always passed in. So user can always just define custom implementation atcore-site.xml. This creates a simple solution with no dependency concern. However, this is not an elegant option because ideally we would like to load all Iceberg classes using Iceberg-based configs such as table properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One quick thing to note: we want to avoid increasing dependence on a Hadoop configuration. It's fine to pass one where it is required, but we should always make sure there is an alternative and should generally avoid using config from it.
Thanks for writing up the options. Sounds like we have options for configuring a FileIO:
- Pass
FileIOin from the catalog and use catalog config to initialize it - Instantiate
FileIObased on table location or metadata path just before using it - Use config from the environment, like a Hadoop FileSystem
I think the best choice is #1, catalog-level configuration.
We can't use table-level config because that creates a situation where it isn't available to load a table. Working around this requires deciding which FileIO to load based on different information at different times (location for create, metadata location for load), and would also make supplying a custom FileIO implementation in configuration difficult.
Environment-level config doesn't fit with the current model for customizing behavior, where everything is injected through Catalog and Table. Plus, Iceberg has no environment config like Hadoop Configuration and I don't think it is a good idea to add it. I think the most sensible thing is to maintain the chain of configuration: application passes config to catalogs, catalogs pass config to tables.
I don't think it would be too difficult to change over to FileIO passed in from the catalog, but this would mean not basing the implementation on table path. We wouldn't know to choose between S3FileIO or HadoopFileIO for a table and would have to use one for all tables from a catalog, or build a FileIO that knows when to choose between the two. I was already thinking that we would need a delegating implementation, since S3FileIO can't handle non-S3 paths. That should be okay.
|
@rdblue So in the latest version, I have added nullable field |
|
|
||
| public static final String ENGINE_HIVE_ENABLED = "iceberg.engine.hive.enabled"; | ||
|
|
||
| public static final String CUSTOM_FILE_IO_IMPL = "iceberg.fileio.impl"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a Hadoop config property?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see this is used to get the implementation class. Iceberg should never use a Hadoop Configuration for config, except when integrating with an engine that uses it for config. It's okay to store configuration for Hive, but not for Iceberg core.
For FileIO implementation, I think the config should come from the catalog property map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the comments! I will fix the other parts accordingly. I think this is the place I do not fully understand. Is there a catalog property map? I don't see such a thing in the catalog interface, and that is why I have to use the Hadoop config.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are config properties from both Flink and Spark that we use when constructing the Hive and Hadoop catalogs. And for #1640, we are talking about using an initialization method to set the config using a string map (and a catalog name). While these classes don't pull their own config out of a map, I think of the config as coming from a map of config properties.
Sorry for the confusion. I think we should configure this however the catalog is configured, which will very likely be a string map passed to an init method in the future.
core/src/main/java/org/apache/iceberg/hadoop/HadoopCatalog.java
Outdated
Show resolved
Hide resolved
| this.conf = conf; | ||
| this.createStack = Thread.currentThread().getStackTrace(); | ||
| this.closed = false; | ||
| this.defaultFileIO = new HadoopFileIO(conf); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instantiating HadoopFileIO before initialize is called is because it isn't always called right now?
| Configuration hadoopConf, | ||
| String warehouseLocation, | ||
| Map<String, String> properties) { | ||
| return new HadoopCatalogLoader(name, hadoopConf, warehouseLocation, properties); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For Flink, I think we should pass the warehouse location and other config through properties. Right now, we pull it out in FlinkCatalogFactory, but it doesn't make much sense to pull out only some properties.
How about leaving the current CatalogLoader.hadoop and CatalogLoader.hive as they are and adding variants like CatalogLoader.hadoop(String name, Map<String, String> properties, Configuration conf)? Then we can pull the properties out using standard config properties that we put in CatalogProperties in the loader classes.
FYI @JingsongLi and @openinx: we're improving how we configure catalogs and allowing some parts to be loaded dynamically. The main change here is passing properties to the catalog as a map.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I only see the CatalogLoader.hadoop and CatalogLoader.hive used in a single place FlinkCatalogFactory.createCatalogLoader, that is why I directly changed the class signature. I don't know if there is any benefit in keeping the old ones.
Changing the signature to CatalogLoader.hadoop(String name, Map<String, String> properties, Configuration conf) looks like a good idea that simplifies the code, let me do that first.
(speaking of this, I should also add null check for the properties map and also give a fixed serialization id for those classes because Flink serializes the catalog loader)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're right. Since these are package-private, I don't think we need to maintain them. I was thinking that people would call these from the public API to configure the source and sink builders, but these aren't public.
We may still want to keep them to avoid changing lots of files in this PR, and I still think it is a good idea to pull out the config here, rather than in FlinkCatalogFactory because we want to move toward catalogs interpreting their own property maps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also give a fixed serialization id for those classes because Flink serializes the catalog loader
We know that serialization across Iceberg versions may be a problem, but I'm not sure that we want to introduce a serialization id. In general, we avoid this because we want serialization to fail if there are multiple library versions. Java serialization isn't something that we want to use for compatibility across versions. The cases where we have multiple library versions are rare, and we want to design something for a rolling update.
| String warehouse, | ||
| int clientPoolSize, | ||
| Configuration conf, | ||
| Map<String, String> properties) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: we prefer args on as few lines as possible, rather than this style.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, sorry I did not see the comments and get a chance to change it before the merge. I guess this kind of issue will be less frequent as I get more familiar with the code style here, but feel free to ping me at any time and I can fix all the style issues before merge 😃
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No problem! I made this comment just before I merged it. Commits don't need to be perfect. I just wanted to let you know for next time.
| public static final String HIVE_URI = "uri"; | ||
| public static final String HIVE_CLIENT_POOL_SIZE = "clients"; | ||
| public static final String HIVE_CONF_DIR = "hive-conf-dir"; | ||
| public static final String WAREHOUSE_LOCATION = "warehouse"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: leaving these in place would have reduce the number of files that this needed to touch, and avoided a possible problem removing public fields. I don't think it's worth blocking for this change, but we like to keep patches as small as possible by not breaking references like these.
LocationProvidersto a util classio-impl